SageMakerの前処理をGlue開発エンドポイントで実行させる:Amazon SageMaker Advent Calendar 2018
概要
こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の9日目の記事となります。
今回はSageMakerの前処理を「AWS Glueの開発エンドポイント」で実行させてみます。
下記を参考に進めていきます。
Managing Notebooks
Working with Notebooks on the AWS Glue Console
AWS Glue コンソールでの開発エンドポイントの操作
以前に、「SageMaker-EMR-Glue-S3」と接続させて前処理をする構成についてのエントリーをご紹介したのですが、それと少し似ています。今回はEMRを使わずに「Glue開発エンドポイントのサーバーレスApache Spark環境」を利用することとなります。
目次
1.最初に
SageMakerを利用していて「データの前処理をどこで実行するか」、という点について悩んだことはないでしょうか?例えば、大きな組織でSageMakerを利用していて「ユーザーごとにノートブックインスタンスを用意」していた場合で考えてみましょう。
「前処理のためにマシーンパワーが必要だから、全員分のインスタンスをスケールアップする」というのはコスト的に厳しい一方、とはいえ前処理に単純に時間がかかるとそれはそれで業務が滞る、といったことになるかと思います。
今回ご紹介する構成では、SageMakerノートブックインスタンスからGlue開発エンドポイントにアクセスすることで、上記のように「処理の重い前処理をGlue開発エンドポイント」にて処理させることができます。
(でかいデータをSageMakerノートブックインスタンスに持ってきて処理、とかしていた部分が楽になるのでどちらかと言うと「データ探索の手間が省ける」といったメリットの方がでかいかもしれません)
詳細については下記をご参照ください
AWS Glue が Amazon SageMaker ノートブックの開発エンドポイントへの接続サポートを開始
2.Glueの準備
まずは「Glue側」の準備をします。
下記の5点を対応する必要があります。
- IAMロールの設定(Glue開発エンドポイントからS3、SageMakerインスタンス、Cloudwatch等にアクセスできるように)
- データベースの作成(Glueデータカタログのデータベース)
- クローラの作成(Glueデータカタログの更新)
- Glue開発エンドポイントの作成(今回メインで稼働するインスタンス)
- SageMakerノートブックインスタンスの作成(Glue開発エンドポイントと接続できるようにしたインスタンス)
2-1.IAMロールの設定
利用するS3バケットにアクセスできるように設定しましょう。
今回は「AmazonS3FullAccess」、「AWSGlueServiceRole」の2つのポリシーをアタッチして先に進みます。
・今回用意したIAMロール
2-2.データベースの作成
Glueデータカタログの「データベース」を作成します。
後ほどSageMakerノートブックインスタンスからアクセスする際は、このデータベースを指定することになります。
作成手順については、まずコンソール画面上から「Glue」の画面に移動し、「データベース」、「データベースの追加」をクリックします。
今回は「legislators」と指定しました。
2-3.クローラの作成
続いて、クローラの作成です。クローラの役割はソースデータが更新された場合に、「Glueデータベース」を更新することです。
「クローラ」から「クローラの追加」をクリックします。
続いて、クローラの名前を「crawler_legislators」等の適当なものに設定し、
データの格納先を指定します。今回はAWSがサンプルとして用意しているデータセットを利用するために「s3://awsglue-datasets/examples/us-legislators/all」と指定します。
今回は上記のデータセットのみを利用するので、ここは「いいえ」にして「次へ」をクリックします。
「2-1.IAMロールの設定」で用意したIAMロールを利用します。このIAMロールは「AmazonS3FullAccess」、「AWSGlueServiceRole」ポリシーをアタッチしているだけのロールです。
今回は、データソースの変更もなく一回実行できればいいので、「オンデマンドで実行」を指定します。
先ほど作成した「legislators」データベースを出力先として指定します。
最後に、諸々の設定に間違いがないかを確認し、問題なければ「完了」をクリックしてクローラを作成します。
クローラの作成が完了したら、クローラを実行しましょう。
下記のように、jsonファイルが認識されてテーブルとして情報が取得できています。
2-4.Glue開発エンドポイントの作成
続いて、実行環境となる「Glue開発エンドポイント」を作成します。
「開発エンドポイント」から「エンドポイントの追加」をクリックし、
開発エンドポイント名、IAMロールを指定します。
IAMロールは先ほどクローラに設定したものと同じものを利用しています。
今回はS3のデータのみを参照するので、「ネットワーキング情報をスキップ」として先に進みます。
今回はチュートリアルとして、とりあえず実行してみることを目的としているので、SSHキーは特に追加せずに「次へ」をクリックします
最後に、全体を確認して問題がなければ「完了」を実行します。
「プロビジョニングのステータス」が「READY」になったらエンドポイントの準備完了です。エンドポイントの準備ができるまで数分かかるので、ゆっくり待ちましょう。
ちなみに、料金は「0.44$/DAU*時間」(執筆時点)とのことなので、今回私が用意したエンドポイントの場合だと1時間で2.2$くらいかかるみたいです。
引用:Glue開発エンドポイントの料金について
2-5.SageMakerノートブックインスタンスの作成
最後に「2-4.Glue開発エンドポイントの作成」で作成したGlue開発エンドポイントと接続できるSageMakerノートブックインスタンスを作成します。
このインスタンスは「Glue開発エンドポイント」と同じリージョンに作成する必要があります。
まずは「Notebooks」から「ノートブックサーバーの作成」をクリックします。
続いて、ノートブックインスタンスの情報等について指定します。
SageMakerノートブックインスタンス立ち上げに利用するIAMロールについては、ここで新規作成しました。
ノートブックインスタンスがGlue開発エンドポイントと紐づいた状態で作成されます。
ちなみに、当然ですがこのインスタンスは「SageMaker」のコンソール画面上からも確認できます。
また、「ライフサイクル設定」を見てみると、新しく設定が追加されていました。
この「ライフサイクル設定」でノートブックインスタンスが起動するたびに、設定した「Glue開発エンドポイント」に接続しているようです。
ノートブックインスタンスが作成できたら、早速ノートブックインスタンスを開いてみましょう。
これで実行環境の準備は完了です。
3.SageMakerノートブックインスタンスでの処理
では、実際にSageMakerノートブックインスタンスから前処理を実行していきます。
やることは、「S3に保存しているjsonファイル」を「Glue開発エンドポイントで処理」させ、その結果を「S3に出力する」というものです。
ここまでと同じ手順で進めている場合は「Joining, Filtering, and Loading Relational Data with AWS Glue.ipynb」というファイルが最初からロードされているかと思いますが、こちらのスクリプトに記述されている内容をベースに解説を続けていこうと思います。
本スクリプト内の処理内容はコード例: データの結合と関係付けにも紹介されています。
3-1.必要なモジュールのインポート
まずは必要なモジュールをインポートします。
awsglue,pysparkモジュールをメインに使います。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
3-2.データの確認
続いて、データの中身を確認してみましょう。
Glueのダイナミックフレームはほとんど触ったことが無かったので少し困惑しましたが、下記2点の説明がわかりやすいのでお勧めです。
まずは指定したテーブルの基本情報をprintSchema関数を使って調べます。
対象データの基本的な定義について調べるこの操作は、利用頻度が高そうです。
orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
Count: 13 root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
また、下記のようなコマンドで実際にどんな値が入っているのか、を簡単に目視で確認することもできます。
orgs.toDF().show()
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+ | identifiers| other_names| id|classification| name| links| image|seats| type| +--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+ |[[wikidata,Q36829...|[[en,multilingual...| party/al| party| AL| null| null| null| null| | [[wikidata,Q29552]]|[[zh,multilingual...| party/democrat| party| Democrat|[[website,http://...|https://upload.wi...| null| null| |[[wikidata,Q65407...|[[en,multilingual...|party/democrat-li...| party| Democrat-Liberal|[[website,http://...| null| null| null| | [[wikidata,Q11701]]| null|d56acebe-8fdc-47b...| legislature|House of Represen...| null| null| 435|lower house| |[[wikidata,Q327591]]|[[zh-hans,multili...| party/independent| party| Independent| null| null| null| null| |[[wikidata,Q10765...|[[de,multilingual...|party/new_progres...| party| New Progressive|[[website,http://...|https://upload.wi...| null| null| |[[wikidata,Q199319]]|[[es,multilingual...|party/popular_dem...| party| Popular Democrat|[[website,http://...| null| null| null| | [[wikidata,Q29468]]|[[zh,multilingual...| party/republican| party| Republican|[[website,http://...|https://upload.wi...| null| null| |[[wikidata,Q51630...|[[en,multilingual...|party/republican-...| party|Republican-Conser...|[[website,http://...| null| null| null| | [[wikidata,Q29552]]|[[zh,multilingual...| party/democrat| party| Democrat|[[website,http://...|https://upload.wi...| null| null| |[[wikidata,Q327591]]|[[zh-hans,multili...| party/independent| party| Independent| null| null| null| null| | [[wikidata,Q29468]]|[[zh,multilingual...| party/republican| party| Republican|[[website,http://...|https://upload.wi...| null| null| | [[wikidata,Q66096]]| null|8fa6c3d2-71dc-478...| legislature| Senate| null| null| 100|upper house| +--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+
3-3.前処理
データの確認の次は前処理に移ります。
今回は「不要なフィールドの削除」、「フィールド名の変更」、「複数のデータフレームをJOIN」といった処理をします。
# フィールド名の変更と不要なフィールドの削除 orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name') # JOIN(サブクエリ的処理させて、最後に不要(重複した)フィールドを削除している) l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
3-4.処理結果をS3に出力
「l_history」データフレームを「parquet形式」のファイルとしてS3に出力します。
出力方法もいくつかありますが、それぞれユースケースに合わせて利用しましょう。
・write_dynamic_frame.from_options()
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://bucket/your_dir"}, format = "parquet")
・
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://bucket/your_dir')
l_history.toDF().write.parquet('s3://bucket/your_dir', partitionBy=['org_name'])
S3に「parquet形式」でファイル出力まで完了したら、後は学習するだけですね。
3-5.Relational DBに書き込む(おまけ)
今回は試していないのでコードの紹介のみにとどめますが、データフレームをDBに書き込むこともできるようです。
以下はサンプルコードですが、もし実際に書き込む場合はGlueの「データベースへの接続設定」をする必要があります。
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to Redshift table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "redshift3", connection_options = {"dbtable": df_name, "database": "testdb"}, redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")
4.まとめ
今回はSageMakerの前処理を実行する環境として、「AWS Glueの開発エンドポイント」を指定してみました。
前処理にマシーンパワーが欲しい場合でも、「ユーザーごとにノートブックインスタンスを用意しているため、コスト的に厳しい」といった場合もあるかと思いますが、そのような場合には今回のように「Glue開発エンドポイント」を「共通の処理環境」にすることで、費用対効果よく処理能力を向上できるかもしれません。
また、Glueを参照しているので、分析者が「S3からデータを持ってきて...」とかをする手間が省け、開発の手間が削減できます。
ただ、今回の構成では「画像や動画」等のデータは処理できないので、それらのデータを処理する場合は異なる方法を選択する必要があります。
本エントリーの紹介内容は以上です。
最後までご覧いただきありがとうございました。